home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / p4 / p4-1_2b.lha / p4-1.2b / lib / p4_sock_sr.c < prev    next >
C/C++ Source or Header  |  1993-02-06  |  10KB  |  438 lines

  1. #include "p4.h"
  2. #include "p4_sys.h"
  3.  
  4. #ifdef CAN_DO_XDR
  5. int xdr_send(type, from, to, msg, len, data_type, ack_req)
  6. char *msg;
  7. int type, from, to, len, data_type, ack_req;
  8. {
  9.     int done = 0;
  10.     int nbytes_written = 0;
  11.     int flag, sent, fd, myid, rc, n, nfds;
  12.     struct p4_net_msg_hdr nmsg;
  13.     XDR *xdr_enc;
  14.     xdrproc_t xdr_proc;
  15.     char *xdr_buff;
  16.     int xdr_elsize, els_per_buf, xdr_numels;
  17.     int xdr_len, xdr_len1, len_bytes;
  18.  
  19.     p4_dprintfl(20, "sending msg of type %d from %d to %d via xdr\n",
  20.         type,from,to);
  21.  
  22.     myid = p4_get_my_id();
  23.     fd = p4_local->conntab[to].port;
  24.  
  25.     nmsg.msg_type = p4_i_to_n(type);
  26.     nmsg.to = p4_i_to_n(to);
  27.     nmsg.from = p4_i_to_n(from);
  28.     switch (data_type)
  29.     {
  30.       case P4INT:
  31.     xdr_proc = xdr_int;
  32.     xdr_elsize = XDR_INT_LEN;
  33.     break;
  34.       case P4LNG:
  35.     xdr_proc = xdr_long;
  36.     xdr_elsize = XDR_LNG_LEN;
  37.     break;
  38.       case P4FLT:
  39.     xdr_proc = xdr_float;
  40.     xdr_elsize = XDR_FLT_LEN;
  41.     break;
  42.       case P4DBL:
  43.     xdr_proc = xdr_double;
  44.     xdr_elsize = XDR_DBL_LEN;
  45.     break;
  46.       default:
  47.     p4_dprintf("xdr_send: invalid data type %d\n", data_type);
  48.     return (-1);
  49.     }
  50.     xdr_numels = len / xdr_elsize;
  51.     nmsg.msg_len = p4_i_to_n(xdr_numels);
  52.     nmsg.ack_req = p4_i_to_n(ack_req);
  53.     nmsg.data_type = p4_i_to_n(data_type);
  54.  
  55.     net_send(fd, &nmsg, sizeof(struct p4_net_msg_hdr), FALSE);
  56.  
  57.     xdr_enc = &(p4_local->xdr_enc);
  58.     xdr_buff = p4_local->xdr_buff;
  59.     els_per_buf = (XDR_BUFF_LEN - XDR_PAD) / xdr_elsize;
  60.     while (xdr_numels > 0)
  61.     {
  62.     if (xdr_numels > els_per_buf)
  63.         xdr_len = els_per_buf;
  64.     else
  65.         xdr_len = xdr_numels;
  66.     xdr_len1 = xdr_len;    /* remember xdr_len */
  67.     if (!xdr_setpos(xdr_enc, 0))
  68.     {
  69.         p4_dprintf("xdr_send: xdr_setpos failed\n");
  70.         return (-1);
  71.     }
  72.     if (!xdr_array(xdr_enc, &msg, &xdr_len, XDR_BUFF_LEN,
  73.                xdr_elsize, xdr_proc))
  74.     {
  75.         p4_dprintf("xdr_send: xdr_array failed\n");
  76.         return (-1);
  77.     }
  78.     len_bytes = xdr_getpos(xdr_enc);
  79.  
  80.     flag = (myid < to) ? TRUE : FALSE;
  81.     net_send(fd, xdr_buff, len_bytes, flag);
  82.  
  83.     nbytes_written += len_bytes;
  84.     xdr_numels -= xdr_len1;
  85.     msg = msg + len_bytes - XDR_PAD;
  86.     }
  87.  
  88.     if (ack_req & P4_ACK_REQ_MASK)
  89.     {
  90.     wait_for_ack(fd);
  91.     }
  92.     p4_dprintfl(10, "sent msg of type %d from %d to %d via xdr\n",type,from,to);
  93.     return (nbytes_written);
  94. }
  95. #endif
  96.  
  97. int socket_send(type, from, to, msg, len, data_type, ack_req)
  98. int type, from, to, len, data_type, ack_req;
  99. char *msg;
  100. {
  101.     int fd, flag;
  102.     int sent = 0;
  103.     int done = 0;
  104.     int nleft, rc, n, nfds;
  105.     struct p4_net_msg_hdr nmsg;
  106.  
  107.     p4_dprintfl(20, "sending msg of type %d from %d to %d via socket\n",type,from,to);
  108.     if (CHECKNODE(to) || CHECKNODE(from))
  109.     p4_error("socket_send: bad header: to/from node is out of range",
  110.          to * 10000 + from);
  111.  
  112.     fd = p4_local->conntab[to].port;
  113.  
  114.     nmsg.msg_type = p4_i_to_n(type);
  115.     nmsg.to = p4_i_to_n(to);
  116.     nmsg.from = p4_i_to_n(from);
  117.     nmsg.msg_len = p4_i_to_n(len);
  118.     nmsg.ack_req = p4_i_to_n(ack_req);
  119.     nmsg.data_type = p4_i_to_n(data_type);
  120.  
  121.     net_send(fd, &nmsg, sizeof(struct p4_net_msg_hdr), FALSE);
  122.     p4_dprintfl(20, "sent hdr for type %d from %d to %d via socket\n",type,from,to);
  123.  
  124.     while (sent < len)
  125.     {
  126.     if ((len - sent) > SOCK_BUFF_SIZE)
  127.         nleft = SOCK_BUFF_SIZE;
  128.     else
  129.         nleft = len - sent;
  130.  
  131.     flag = (from < to) ? TRUE : FALSE;
  132.     n = net_send(fd, ((char *) msg) + sent, nleft, flag);
  133.  
  134.     sent += n;
  135.     }
  136.  
  137.     if (ack_req & P4_ACK_REQ_MASK)
  138.     {
  139.     wait_for_ack(fd);
  140.     }
  141.     p4_dprintfl(10, "sent msg of type %d from %d to %d via socket\n",type,from,to);
  142.     return (sent);
  143. }
  144.  
  145.  
  146. struct p4_msg *socket_recv()
  147. {
  148.     int i, fd, nfds;
  149.     struct p4_msg *tmsg = NULL;
  150.     P4BOOL found = FALSE;
  151.     struct timeval tv;
  152.     fd_set read_fds;
  153.  
  154.     while (!found)
  155.     {
  156.     tv.tv_sec = 9;
  157.     tv.tv_usec = 0;  /* RMB */
  158.     FD_ZERO(&read_fds);
  159.     for (i = 0; !tmsg && i < p4_global->num_in_proctable; i++)
  160.     {
  161.         if (p4_local->conntab[i].type == CONN_REMOTE_EST)
  162.         {
  163.         fd = p4_local->conntab[i].port;
  164.         FD_SET(fd, &read_fds);
  165.         }
  166.     }
  167.     SYSCALL_P4(nfds, select(p4_global->max_connections, &read_fds, 0, 0, &tv));
  168.     if (nfds)
  169.     {
  170.         for (i = 0; !tmsg && i < p4_global->num_in_proctable; i++)
  171.         {
  172.         if (p4_local->conntab[i].type == CONN_REMOTE_EST)
  173.         {
  174.             fd = p4_local->conntab[i].port;
  175.             if (FD_ISSET(fd,&read_fds)  &&  sock_msg_avail_on_fd(fd))
  176.             {
  177.             tmsg = socket_recv_on_fd(fd);
  178.             found = TRUE;
  179.             if (tmsg->ack_req & P4_ACK_REQ_MASK)
  180.             {
  181.                 send_ack(fd, tmsg->from);
  182.             }
  183.             }
  184.         }
  185.         }
  186.     }
  187.     }
  188.     return (tmsg);
  189. }
  190.  
  191. struct p4_msg *socket_recv_on_fd(fd)
  192. int fd;
  193. {
  194.     int n, data_type, msg_len;
  195.     struct p4_msg *tmsg;
  196.     struct p4_net_msg_hdr nmsg;
  197.  
  198.     n = net_recv(fd, &nmsg, sizeof(struct p4_net_msg_hdr));
  199.  
  200.     data_type = p4_n_to_i(nmsg.data_type);
  201.     if (data_type == P4NOX)
  202.     msg_len = p4_n_to_i(nmsg.msg_len);
  203.     else
  204.     {
  205.     switch (data_type)
  206.     {
  207.       case P4INT:
  208.         msg_len = p4_n_to_i(nmsg.msg_len) * XDR_INT_LEN;
  209.         break;
  210.       case P4LNG:
  211.         msg_len = p4_n_to_i(nmsg.msg_len) * XDR_LNG_LEN;
  212.         break;
  213.       case P4FLT:
  214.         msg_len = p4_n_to_i(nmsg.msg_len) * XDR_FLT_LEN;
  215.         break;
  216.       case P4DBL:
  217.         msg_len = p4_n_to_i(nmsg.msg_len) * XDR_DBL_LEN;
  218.         break;
  219.       default:
  220.         p4_error("socket_recv_on_fd: invalid data type %d\n", data_type);
  221.     }
  222.     }
  223.  
  224.     tmsg = alloc_p4_msg(msg_len);
  225.     tmsg->type = p4_n_to_i(nmsg.msg_type);
  226.     tmsg->from = p4_n_to_i(nmsg.from);
  227.     tmsg->to = p4_n_to_i(nmsg.to);
  228.     tmsg->len = p4_n_to_i(nmsg.msg_len);    /* chgd by xdr_recv below */
  229.     tmsg->data_type = p4_n_to_i(nmsg.data_type);
  230.     tmsg->ack_req = p4_n_to_i(nmsg.ack_req);
  231.     if (tmsg->data_type == P4NOX || p4_local->conntab[tmsg->from].same_data_rep)
  232.     {
  233.     n = net_recv(fd, (char *) &(tmsg->msg), tmsg->len);
  234.     }
  235.     else
  236.     {
  237. #       ifdef CAN_DO_XDR
  238.     n = xdr_recv(fd, tmsg);
  239. #       else
  240.     p4_error("cannot do xdr recvs\n",0);
  241. #       endif
  242.     }
  243.     return (tmsg);
  244. }
  245.  
  246. P4BOOL socket_msgs_available()
  247. {
  248.     int i, fd;
  249.  
  250.     for (i = 0; i < p4_global->num_in_proctable; i++)
  251.     {
  252.     if (p4_local->conntab[i].type == CONN_REMOTE_EST)
  253.     {
  254.         fd = p4_local->conntab[i].port;
  255.         if (sock_msg_avail_on_fd(fd))
  256.         {
  257.         return (TRUE);
  258.         }
  259.     }
  260.     }
  261.     return (FALSE);
  262. }
  263.  
  264. P4BOOL sock_msg_avail_on_fd(fd)
  265. int fd;
  266. {
  267.     int i, rc, nfds;
  268.     struct timeval tv;
  269.     fd_set read_fds;
  270.     char tempbuf[2];
  271.  
  272.     rc = FALSE;
  273.     tv.tv_sec = 0;
  274.     tv.tv_usec = 0;
  275.     FD_ZERO(&read_fds);
  276.     FD_SET(fd, &read_fds);
  277.     SYSCALL_P4(nfds, select(p4_global->max_connections, &read_fds, 0, 0, &tv));
  278.  
  279.     if (nfds == -1)
  280.     {
  281.     p4_error("sock_msg_avail_on_fd select", nfds);
  282.     }
  283.     if (nfds)            /* true even for eof */
  284.     {
  285.     /* see if data is on the socket or merely an eof condition */
  286.     /* this should not loop long because the select succeeded */
  287.     while ((rc = recv(fd, tempbuf, 1, MSG_PEEK)) == -1)
  288.         ;    
  289.  
  290.     if (rc == 0)    /* if eof */
  291.     {
  292.         /* eof; a process has closed its socket; may have died */
  293.         for (i = 0; i < p4_global->num_in_proctable; i++)
  294.         if (p4_local->conntab[i].port == fd)
  295.             p4_local->conntab[i].type = CONN_REMOTE_DYING;
  296.     }
  297.     else
  298.         rc = TRUE;
  299.     }
  300.     return (rc);
  301. }
  302.  
  303. #ifdef CAN_DO_XDR
  304. int xdr_recv(fd, rmsg)
  305. int fd;
  306. struct p4_msg *rmsg;
  307. {
  308.     xdrproc_t xdr_proc;
  309.     XDR *xdr_dec;
  310.     char *xdr_buff, *msg;
  311.     int i, n;
  312.     int msg_len = 0, nbytes_read = 0;
  313.     int xdr_elsize, els_per_buf, xdr_numels;
  314.     int xdr_len, xdr_len1, len_bytes;
  315.  
  316.     msg = (char *) &(rmsg->msg);
  317.  
  318.     xdr_dec = &(p4_local->xdr_dec);
  319.     xdr_buff = p4_local->xdr_buff;
  320.     switch (rmsg->data_type)
  321.     {
  322.       case P4INT:
  323.     xdr_proc = xdr_int;
  324.     xdr_elsize = XDR_INT_LEN;
  325.     break;
  326.       case P4LNG:
  327.     xdr_proc = xdr_long;
  328.     xdr_elsize = XDR_LNG_LEN;
  329.     break;
  330.       case P4FLT:
  331.     xdr_proc = xdr_float;
  332.     xdr_elsize = XDR_FLT_LEN;
  333.     break;
  334.       case P4DBL:
  335.     xdr_proc = xdr_double;
  336.     xdr_elsize = XDR_DBL_LEN;
  337.     break;
  338.       default:
  339.     p4_dprintf("xdr_recv: invalid data type %d\n", rmsg->data_type);
  340.     return (-1);
  341.     }
  342.     xdr_numels = rmsg->len;
  343.     els_per_buf = (XDR_BUFF_LEN - XDR_PAD) / xdr_elsize;
  344.     while (xdr_numels > 0)
  345.     {
  346.     if (xdr_numels > els_per_buf)
  347.         xdr_len = els_per_buf;
  348.     else
  349.         xdr_len = xdr_numels;
  350.     xdr_len1 = xdr_len;    /* remember xdr_len */
  351.  
  352.     len_bytes = (xdr_len * xdr_elsize) + XDR_PAD;
  353.         p4_dprintfl(90, "xdr_recv: reading %d bytes from %d\n", len_bytes, fd);
  354.     n = net_recv(fd, xdr_buff, len_bytes);
  355.     p4_dprintfl(90, "xdr_recv: read %d bytes \n", n);
  356.  
  357.     if (n < 0)
  358.     {
  359.         p4_error("xdr_recv net_recv", n);
  360.     }
  361.  
  362.     if (!xdr_setpos(xdr_dec, 0))
  363.     {
  364.         p4_dprintf("xdr_recv: xdr_setpos failed\n");
  365.         return (-1);
  366.     }
  367.  
  368.     if (!xdr_array(xdr_dec, &msg, &xdr_len, XDR_BUFF_LEN,
  369.                xdr_elsize, xdr_proc))
  370.     {
  371.         p4_dprintf("xdr_recv: xdr_array failed\n");
  372.         return (-1);
  373.     }
  374.  
  375.     nbytes_read += len_bytes;
  376.     xdr_numels -= xdr_len1;
  377.     msg = msg + len_bytes - XDR_PAD;
  378.     msg_len = msg_len + len_bytes - XDR_PAD;
  379.     }
  380.     rmsg->len = msg_len;
  381.     return (msg_len);
  382. }
  383. #endif
  384.  
  385. P4VOID wait_for_ack(fd)
  386. int fd;
  387. {
  388.     struct p4_msg *ack;
  389.  
  390.     p4_dprintfl(30, "waiting for ack \n");
  391.     ack = socket_recv_on_fd(fd);
  392.     while (!(ack->ack_req & P4_ACK_REPLY_MASK))
  393.     {
  394.     queue_p4_message(ack, p4_local->queued_messages);
  395.     ack = socket_recv_on_fd(fd);
  396.     }
  397.     free_p4_msg(ack);
  398.     p4_dprintfl(30, "received ack from %d\n", ack->from);
  399. }
  400.  
  401. P4VOID send_ack(fd, to)
  402. int fd, to;
  403. {
  404.     struct p4_net_msg_hdr ack;
  405.  
  406.     p4_dprintfl(30, "sending ack to %d\n", to);
  407.     ack.from = p4_i_to_n(p4_get_my_id());
  408.     ack.data_type = p4_i_to_n(P4NOX);
  409.     ack.msg_len = p4_i_to_n(0);
  410.     ack.to = p4_i_to_n(to);
  411.     ack.ack_req = p4_i_to_n(P4_ACK_REPLY_MASK);
  412.     net_send(fd, &ack, sizeof(ack), FALSE);
  413.     p4_dprintfl(30, "sent ack to %d\n", to);
  414. }
  415.  
  416. P4VOID shutdown_p4_socks()
  417. /*
  418.   Shutdown all sockets we know about discarding info
  419.   in either direction.
  420.   */
  421. {
  422.     int i;
  423.  
  424.     if (!p4_local)        /* Need info to be defined */
  425.     return;
  426.     if (!p4_local->conntab)
  427.     return;
  428.     if (p4_local->my_id == LISTENER_ID)
  429.     return;
  430.  
  431.     for (i = 0; i < p4_num_total_ids(); i++)
  432.     if (p4_local->conntab[i].type == CONN_REMOTE_EST)
  433.     {
  434.         (P4VOID) shutdown(p4_local->conntab[i].port, 2);
  435.         (P4VOID) close(p4_local->conntab[i].port);
  436.     }
  437. }
  438.